kafka删除topic消息的四种方式

您所在的位置:网站首页 kafka 队列名 kafka删除topic消息的四种方式

kafka删除topic消息的四种方式

2024-07-11 05:59| 来源: 网络整理| 查看: 265

方法一:快速配置删除法(简单粗暴,如果这个主题有程序还在消费者,此时KAFKA就game over)

1.kafka启动之前,在server.properties配置delete.topic.enable=true

2.执行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manager集群管理工具删除

注意:如果kafka启动之前没有配置delete.topic.enable=true,topic只会标记为marked for deletion,加上配置,重启kafka,之前的topick就真正删除了。

 

方法二:设置删除策略(简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over)

1.kafka启动之前,在server.properties配置

#日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖 log.cleanup.policy = delete # 注意:下面有两种配置,一种是基于时间的策略,另种是基于日志文件大小的策略,两种策略同是配置的话,只要满足其中种策略,则触发Log删除的操作。删除操作总是先删除最旧的日志 # 消息在Kafka中保存的时间,168小时之前的1og, 可以被删除掉,根据policy处理数据。 log.retention.hours=4 # 当剩余空间低于log.retention.bytes字节,则开始删除1og log.retention.bytes=37580963840 # 每隔300000ms, logcleaner线程将检查一次,看是否符合上述保留策略的消息可以被删除 log.retention.check.interval.ms=1000

 

方法三:手动删除法(不推荐)(简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over)

前提:不允许更改server.properties配置

1.删除zk下面topic(test)

启动bin/zkCli.sh ls /brokers/topics rmr /brokers/topics/test ls /brokers/topics 查topic是否删除:bin/kafka-topics.sh --list --zookeeper zk:2181

2.删除各broker下topic数据,默认目录为/tmp/kafka-logs  

 

方法四:偏移量(看起来你最友好,会程序的你推荐)

package com.censoft.kafkaAdmin; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import java.sql.*; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * @author zy Zhang * @version : 1.0 * @Description * @since 2020/7/13 16:02 */ public class DeleteReordsByOffset { public static void main(String[] args) throws ClassNotFoundException { // 1.创建kafkaAdminClient Properties properties = new Properties(); properties.put("bootstrap.servers","192.168.27.111:9092"); AdminClient kafkaAdminClient = KafkaAdminClient.create(properties); // 2.从数据库获取需要删除的消息 Class.forName("com.mysql.jdbc.Driver"); Map recordsToDelete = new HashMap(); String url = "jdbc:mysql://localhost:3306/test?useSSL=false;useUnicode=true;characterEncoding=UTF-8"; String user = "root"; String password = "123456"; Connection conn = null; Statement statement = null; ResultSet res = null; String sql = "SELECT Topic, KafkaPartition, UntilOffset FROM Kafka_Offset;"; try { conn = DriverManager.getConnection(url, user, password); statement = conn.createStatement(); res = statement.executeQuery(sql); if (res != null) { while (res.next()) { String topic = res.getString("Topic"); Integer partition = res.getInt("KafkaPartition"); Long offset = res.getLong("UntilOffset"); TopicPartition topicPartition = new TopicPartition(topic, partition); RecordsToDelete recordsToDelete1 = RecordsToDelete.beforeOffset(offset); recordsToDelete.put(topicPartition, recordsToDelete1); } } } catch (SQLException e) { e.printStackTrace(); } finally { if (statement != null) { try { statement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } // 3.执行删除 DeleteRecordsResult result = kafkaAdminClient.deleteRecords(recordsToDelete); Map lowWatermarks = result.lowWatermarks(); try { for (Map.Entry entry : lowWatermarks.entrySet()) { System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark()); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } kafkaAdminClient.close(); } }

2020-11-27 补充说明:

目前发现通过这种方法起到的效果是: topic的起始偏移量会被定位到传入的recordsToDelete指定的位置 但是并没有将磁盘中存储的数据删除 如果我找到在磁盘删除的方法会继续更新,看下面

2020-11-30 补充说明: 我重新进行了测试,发现使用kafka-delete-records.sh或者kafkaAdminClient.deleteRecords()方法还有其他约束条件: 首先就是log文件自身有大小设置,对应配置文件中log.segment.bytes,在没有达到这个大小的时候是不会创建下一个log文件的。

eg: test-0下有三个log文件 00000000000000000000.log, 00000000000000000036.log, 00000000000000000136.log 我们修改起始偏移量=100 那么deleteLogStartOffsetBreachedSegments运行时会判定00000000000000000000.log是可以删除的

在原先测试时,log.segment.bytes=1G了,这造成了很难观测到数据从硬盘删除 本次测试,我将log.segment.bytes修改为了1M



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3